In [2]:
sc.addPyFile("https://github.com/ibm-watson-data-lab/simple-data-pipe-connector-flightstats/raw/master/flightPredict/training.py")
sc.addPyFile("https://github.com/ibm-watson-data-lab/simple-data-pipe-connector-flightstats/raw/master/flightPredict/run.py")
import training
import run
In [3]:
%matplotlib inline
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.linalg import Vectors
from numpy import array
import numpy as np
import math
from datetime import datetime
from dateutil import parser
sqlContext=SQLContext(sc)
training.sqlContext = sqlContext
training.cloudantHost='dtaieb.cloudant.com'
training.cloudantUserName='weenesserliffircedinvers'
training.cloudantPassword='72a5c4f939a9e2578698029d2bb041d775d088b5'
training.weatherUrl='https://4b88408f-11e5-4ddc-91a6-fbd442e84879:p6hxeJsfIb@twcservice.mybluemix.net'
In [4]:
dbName="pycon_flightpredict_training_set"
%time cloudantdata = training.loadDataSet(dbName,"training")
%time cloudantdata.printSchema()
%time cloudantdata.count()
Out[4]:
In [5]:
training.scatterPlotForFeatures(cloudantdata, \
"departureWeather.temp","arrivalWeather.temp","Departure Airport Temp", "Arrival Airport Temp")
In [6]:
training.scatterPlotForFeatures(cloudantdata,\
"departureWeather.pressure","arrivalWeather.pressure","Departure Airport Pressure", "Arrival Airport Pressure")
In [7]:
training.scatterPlotForFeatures(cloudantdata,\
"departureWeather.wspd","arrivalWeather.wspd","Departure Airport Wind Speed", "Arrival Airport Wind Speed")
In [8]:
trainingData = training.loadLabeledDataRDD("training")
trainingData.take(5)
Out[8]:
In [9]:
from pyspark.mllib.classification import LogisticRegressionWithLBFGS
logRegModel = LogisticRegressionWithLBFGS.train(trainingData.map(lambda lp: LabeledPoint(lp.label,\
np.fromiter(map(lambda x: 0.0 if np.isnan(x) else x,lp.features.toArray()),dtype=np.double )))\
, iterations=1000, validateData=False, intercept=False)
print(logRegModel)
In [10]:
from pyspark.mllib.classification import NaiveBayes
#NaiveBayes requires non negative features, set them to 0 for now
modelNaiveBayes = NaiveBayes.train(trainingData.map(lambda lp: LabeledPoint(lp.label, \
np.fromiter(map(lambda x: x if x>0.0 else 0.0,lp.features.toArray()),dtype=np.int)\
))\
)
print(modelNaiveBayes)
In [11]:
from pyspark.mllib.tree import DecisionTree
modelDecisionTree = DecisionTree.trainClassifier(trainingData.map(lambda lp: LabeledPoint(lp.label,\
np.fromiter(map(lambda x: 0.0 if np.isnan(x) else x,lp.features.toArray()),dtype=np.double )))\
, numClasses=training.getNumClasses(), categoricalFeaturesInfo={})
print(modelDecisionTree)
In [12]:
from pyspark.mllib.tree import RandomForest
modelRandomForest = RandomForest.trainClassifier(trainingData.map(lambda lp: LabeledPoint(lp.label,\
np.fromiter(map(lambda x: 0.0 if np.isnan(x) else x,lp.features.toArray()),dtype=np.double )))\
, numClasses=training.getNumClasses(), categoricalFeaturesInfo={},numTrees=100)
print(modelRandomForest)
In [13]:
dbTestName="pycon_flightpredict_test_set"
testCloudantdata = training.loadDataSet(dbTestName,"test")
testCloudantdata.count()
Out[13]:
In [14]:
testData = training.loadLabeledDataRDD("test")
training.displayConfusionTable=True
training.runMetrics(testData,modelNaiveBayes,modelDecisionTree,logRegModel,modelRandomForest)
In [15]:
rdd = sqlContext.sql("select deltaDeparture from training").map(lambda s: s.deltaDeparture)\
.filter(lambda s: s < 50 and s > 12)
print(rdd.count())
histo = rdd.histogram(50)
#print(histo[0])
#print(histo[1])
%matplotlib inline
import matplotlib
import matplotlib.pyplot as plt
import numpy as np
bins = [i for i in histo[0]]
params = plt.gcf()
plSize = params.get_size_inches()
params.set_size_inches( (plSize[0]*2.5, plSize[1]*2) )
plt.ylabel('Number of records')
plt.xlabel('Bin')
plt.title('Histogram')
intervals = [abs(j-i) for i,j in zip(bins[:-1], bins[1:])]
values=[sum(intervals[:i]) for i in range(0,len(intervals))]
plt.bar(values, histo[1], intervals, color='b', label = "Bins")
plt.xticks(bins[:-1],[int(i) for i in bins[:-1]])
plt.legend()
plt.show()
In [16]:
class customTrainingHandler(training.defaultTrainingHandler):
def getClassLabel(self, value):
if ( int(value)==0 ):
return "Delayed less than 13 minutes"
elif (int(value)==1 ):
return "Delayed between 13 and 41 minutes"
elif (int(value) == 2 ):
return "Delayed more than 41 minutes"
return value
def numClasses(self):
return 3
def computeClassification(self, s):
return 0 if s.deltaDeparture<13 else (1 if s.deltaDeparture < 41 else 2)
def customTrainingFeaturesNames(self ):
return ["departureTime"]
def customTrainingFeatures(self, s):
dt=parser.parse(s.departureTime)
print(dt)
features=[]
for i in range(0,7):
features.append(1 if dt.weekday()==i else 0)
return features
training.customTrainingHandler=customTrainingHandler()
#reload the training labeled data RDD
trainingData = training.loadLabeledDataRDD("training")
#recompute the models
logRegModel = LogisticRegressionWithLBFGS.train(trainingData.map(lambda lp: LabeledPoint(lp.label,\
np.fromiter(map(lambda x: 0.0 if np.isnan(x) else x,lp.features.toArray()),dtype=np.double )))\
, iterations=1000, validateData=False, intercept=False)
modelNaiveBayes = NaiveBayes.train(trainingData.map(lambda lp: LabeledPoint(lp.label, \
np.fromiter(map(lambda x: x if x>0.0 else 0.0,lp.features.toArray()),dtype=np.int)\
))\
)
modelDecisionTree = DecisionTree.trainClassifier(trainingData.map(lambda lp: LabeledPoint(lp.label,\
np.fromiter(map(lambda x: 0.0 if np.isnan(x) else x,lp.features.toArray()),dtype=np.double )))\
, numClasses=training.getNumClasses(), categoricalFeaturesInfo={})
modelRandomForest = RandomForest.trainClassifier(trainingData.map(lambda lp: LabeledPoint(lp.label,\
np.fromiter(map(lambda x: 0.0 if np.isnan(x) else x,lp.features.toArray()),dtype=np.double )))\
, numClasses=training.getNumClasses(), categoricalFeaturesInfo={},numTrees=100)
#reload the test labeled data
testData = training.loadLabeledDataRDD("test")
#recompute the accuracy metrics
training.displayConfusionTable=True
training.runMetrics(testData,modelNaiveBayes,modelDecisionTree,logRegModel,modelRandomForest)
In [17]:
run.useModels(modelNaiveBayes,modelDecisionTree,logRegModel,modelRandomForest)
run.runModel('BOS', "2016-05-18 20:15-0500", 'AUS', "2016-05-18 22:30-0800" )
In [ ]: